use std::{
    collections::{HashMap, HashSet},
    sync::Arc,
};

use futures::future::try_join_all;
use lakekeeper::{
    ProjectId, WarehouseId,
    api::{ApiContext, IcebergErrorResponse, RequestMetadata, management::v1::role::Role},
    async_trait,
    axum::Router,
    service::{
        Actor, AuthZTableInfo, AuthZViewInfo, CatalogStore, ErrorModel, NamespaceHierarchy,
        NamespaceId, NamespaceWithParent, ResolvedWarehouse, RoleId, SecretStore, ServerId, State,
        TableId, UserId, ViewId,
        authz::{
            AuthorizationBackendUnavailable, Authorizer, CannotInspectPermissions,
            CatalogProjectAction, CatalogUserAction, IsAllowedActionError, ListProjectsResponse,
            NamespaceParent, UserOrRole,
        },
        health::Health,
    },
    tokio::sync::RwLock,
};
use openfga_client::{
    client::{
        BasicOpenFgaClient, BatchCheckItem, CheckRequestTupleKey, ConsistencyPreference,
        ReadRequestTupleKey, ReadResponse, Tuple, TupleKey, TupleKeyWithoutCondition,
        batch_check_single_result::CheckResult,
    },
    tonic,
};
#[cfg(feature = "open-api")]
use utoipa::OpenApi as _;

use crate::{
    AUTH_CONFIG, FgaType, MAX_TUPLES_PER_WRITE,
    entities::{OpenFgaEntity, ParseOpenFgaEntity},
    error::{
        BatchCheckError, MissingItemInBatchCheck, OpenFGABackendUnavailable, OpenFGAError,
        OpenFGAResult, UnexpectedCorrelationId,
    },
    models::OpenFgaType,
    relations::{
        self, NamespaceRelation, OpenFgaRelation, ProjectRelation, ReducedRelation, RoleRelation,
        ServerRelation, TableRelation, ViewRelation, WarehouseRelation,
    },
};

type AuthorizerResult<T> = std::result::Result<T, IcebergErrorResponse>;

#[derive(Clone, Debug)]
pub struct OpenFGAAuthorizer {
    pub(crate) client: BasicOpenFgaClient,
    client_higher_consistency: BasicOpenFgaClient,
    pub(crate) health: Arc<RwLock<Vec<Health>>>,
    server_id: ServerId,
}

impl OpenFGAAuthorizer {
    pub fn new(client: BasicOpenFgaClient, server_id: ServerId) -> Self {
        let client_higher_consistency = client
            .clone()
            .set_consistency(ConsistencyPreference::HigherConsistency);
        Self {
            client,
            client_higher_consistency,
            health: Arc::new(RwLock::new(vec![])),
            server_id,
        }
    }
}

/// Implements batch checks for the `are_allowed_x_actions` methods.
#[async_trait::async_trait]
impl Authorizer for OpenFGAAuthorizer {
    type ServerAction = ServerRelation;
    type ProjectAction = ProjectRelation;
    type WarehouseAction = WarehouseRelation;
    type NamespaceAction = NamespaceRelation;
    type TableAction = TableRelation;
    type ViewAction = ViewRelation;
    type UserAction = CatalogUserAction;
    type RoleAction = RoleRelation;

    fn implementation_name() -> &'static str {
        "openfga"
    }

    fn server_id(&self) -> ServerId {
        self.server_id
    }

    #[cfg(feature = "open-api")]
    fn api_doc() -> utoipa::openapi::OpenApi {
        crate::api::ApiDoc::openapi()
    }

    fn new_router<C: CatalogStore, S: SecretStore>(&self) -> Router<ApiContext<State<Self, C, S>>> {
        crate::api::new_v1_router()
    }

    async fn check_assume_role_impl(
        &self,
        principal: &UserId,
        assumed_role: RoleId,
    ) -> Result<bool, AuthorizationBackendUnavailable> {
        self.check(CheckRequestTupleKey {
            user: Actor::Principal(principal.clone()).to_openfga(),
            relation: relations::RoleRelation::CanAssume.to_string(),
            object: assumed_role.to_openfga(),
        })
        .await
        .map_err(Into::into)
    }

    async fn can_bootstrap(&self, metadata: &RequestMetadata) -> AuthorizerResult<()> {
        let actor = metadata.actor();
        // We don't check the actor as assumed roles are irrelevant for bootstrapping.
        // The principal is the only relevant actor.
        if &Actor::Anonymous == actor {
            return Err(ErrorModel::unauthorized(
                "Anonymous users cannot bootstrap the catalog",
                "AnonymousBootstrap",
                None,
            )
            .into());
        }
        Ok(())
    }

    async fn bootstrap(
        &self,
        metadata: &RequestMetadata,
        is_operator: bool,
    ) -> AuthorizerResult<()> {
        let actor = metadata.actor();
        // We don't check the actor as assumed roles are irrelevant for bootstrapping.
        // The principal is the only relevant actor.
        let user = match actor {
            Actor::Principal(principal) | Actor::Role { principal, .. } => principal,
            Actor::Anonymous => {
                return Err(ErrorModel::internal(
                    "can_bootstrap should be called before bootstrap",
                    "AnonymousBootstrap",
                    None,
                )
                .into());
            }
        };

        let relation = if is_operator {
            ServerRelation::Operator
        } else {
            ServerRelation::Admin
        };

        self.write(
            Some(vec![TupleKey {
                user: user.to_openfga(),
                relation: relation.to_string(),
                object: self.openfga_server().clone(),
                condition: None,
            }]),
            None,
        )
        .await?;

        Ok(())
    }

    async fn list_projects_impl(
        &self,
        metadata: &RequestMetadata,
    ) -> Result<ListProjectsResponse, AuthorizationBackendUnavailable> {
        let actor = metadata.actor();
        self.list_projects_internal(actor).await.map_err(Into::into)
    }

    async fn can_search_users_impl(
        &self,
        metadata: &RequestMetadata,
    ) -> Result<bool, AuthorizationBackendUnavailable> {
        // Currently all authenticated principals can search users
        Ok(metadata.actor().is_authenticated())
    }

    async fn are_allowed_role_actions_impl(
        &self,
        metadata: &RequestMetadata,
        for_user: Option<&UserOrRole>,
        roles_with_actions: &[(&Role, Self::RoleAction)],
    ) -> Result<Vec<bool>, IsAllowedActionError> {
        // Every authenticated user can read role metadata.
        // This does not include assignments to the role.
        // Used for cross-project role get so that we can show role names and not just IDs.

        let user =
            for_user.map_or_else(|| metadata.actor().to_openfga(), OpenFgaEntity::to_openfga);

        // Separate CanRead actions from others to avoid unnecessary batch checks
        let mut results = Vec::with_capacity(roles_with_actions.len());
        let mut batch_items = Vec::new();
        let mut batch_indices = Vec::new();
        for (idx, (role, action)) in roles_with_actions.iter().enumerate() {
            if *action == RoleRelation::CanReadMetadata {
                results.push((idx, true));
            } else {
                batch_indices.push(idx);
                batch_items.push(CheckRequestTupleKey {
                    user: user.clone(),
                    relation: action.to_string(),
                    object: role.id.to_openfga(),
                });
            }
        }

        // Only perform batch check if there are non-CanRead actions
        if !batch_items.is_empty() {
            let guard_tuples = if for_user.is_some() {
                // Collect unique role objects for permission checks
                let unique_roles: HashSet<_> = roles_with_actions
                    .iter()
                    .filter(|(_, action)| *action != RoleRelation::CanReadMetadata)
                    .map(|(role, _)| role.id.to_openfga())
                    .collect();

                unique_roles
                    .into_iter()
                    .map(|role_obj| CheckRequestTupleKey {
                        user: metadata.actor().to_openfga(),
                        relation: RoleRelation::CanReadAssignments.to_string(),
                        object: role_obj,
                    })
                    .collect()
            } else {
                vec![]
            };

            let batch_results = self
                .check_actions_with_permission_guard(metadata.actor(), batch_items, guard_tuples)
                .await?;

            for (batch_idx, result) in batch_results.iter().enumerate() {
                results.push((batch_indices[batch_idx], *result));
            }
        }

        // Sort by original index and extract boolean values
        results.sort_by_key(|(idx, _)| *idx);
        Ok(results.into_iter().map(|(_, allowed)| allowed).collect())
    }

    async fn are_allowed_user_actions_impl(
        &self,
        metadata: &RequestMetadata,
        for_user: Option<&UserOrRole>,
        users_with_actions: &[(&UserId, Self::UserAction)],
    ) -> Result<Vec<bool>, IsAllowedActionError> {
        let actor_principal = match metadata.actor() {
            Actor::Role {
                principal,
                assumed_role: _,
            }
            | Actor::Principal(principal) => Some(principal),
            Actor::Anonymous => None,
        };

        let mut results = Vec::with_capacity(users_with_actions.len());
        let mut batch_indices = Vec::new();

        for (idx, (user_id, action)) in users_with_actions.iter().enumerate() {
            // 1. Users can perform all actions on themselves
            // 2. Every authenticated user can read user metadata given the user id
            let is_same_user = for_user.is_none() && (actor_principal == Some(*user_id));
            if is_same_user || *action == CatalogUserAction::Read {
                results.push((idx, true));
            } else {
                batch_indices.push((idx, *action));
            }
        }

        if !batch_indices.is_empty() {
            let server_id = self.openfga_server().clone();
            let actor_openfga = metadata.actor().to_openfga();
            let user = for_user.map_or_else(|| actor_openfga.clone(), OpenFgaEntity::to_openfga);

            let batch_results = self
                .batch_check(vec![
                    CheckRequestTupleKey {
                        user: actor_openfga.clone(),
                        relation: ServerRelation::CanListUsers.to_string(),
                        object: server_id.clone(),
                    },
                    CheckRequestTupleKey {
                        user: user.clone(),
                        relation: ServerRelation::CanUpdateUsers.to_string(),
                        object: server_id.clone(),
                    },
                    CheckRequestTupleKey {
                        user,
                        relation: ServerRelation::CanDeleteUsers.to_string(),
                        object: server_id.clone(),
                    },
                ])
                .await?;

            let is_allowed_to_know = batch_results[0];
            let can_update = batch_results[1];
            let can_delete = batch_results[2];

            if for_user.is_some() && !is_allowed_to_know {
                return Err(
                    CannotInspectPermissions::new(metadata.actor().clone(), &server_id).into(),
                );
            }

            for (idx, action) in batch_indices {
                let allowed = match action {
                    CatalogUserAction::Read => true,
                    CatalogUserAction::Update => can_update,
                    CatalogUserAction::Delete => can_delete,
                };
                results.push((idx, allowed));
            }
        }

        results.sort_by_key(|(idx, _)| *idx);
        Ok(results.into_iter().map(|(_, allowed)| allowed).collect())
    }

    async fn are_allowed_server_actions_impl(
        &self,
        metadata: &RequestMetadata,
        for_user: Option<&UserOrRole>,
        actions: &[Self::ServerAction],
    ) -> Result<Vec<bool>, IsAllowedActionError> {
        let user =
            for_user.map_or_else(|| metadata.actor().to_openfga(), OpenFgaEntity::to_openfga);
        let object = self.openfga_server().clone();

        let items: Vec<_> = actions
            .iter()
            .map(|a| CheckRequestTupleKey {
                user: user.clone(),
                relation: a.to_string(),
                object: object.clone(),
            })
            .collect();

        let guard_tuples = if for_user.is_some() {
            vec![CheckRequestTupleKey {
                user: metadata.actor().to_openfga(),
                relation: ServerRelation::CanReadAssignments.to_string(),
                object: object.clone(),
            }]
        } else {
            vec![]
        };

        self.check_actions_with_permission_guard(metadata.actor(), items, guard_tuples)
            .await
    }

    async fn are_allowed_project_actions_impl(
        &self,
        metadata: &RequestMetadata,
        for_user: Option<&UserOrRole>,
        projects_with_actions: &[(&ProjectId, Self::ProjectAction)],
    ) -> std::result::Result<Vec<bool>, IsAllowedActionError> {
        let user =
            for_user.map_or_else(|| metadata.actor().to_openfga(), OpenFgaEntity::to_openfga);

        let items: Vec<_> = projects_with_actions
            .iter()
            .map(|(project, a)| CheckRequestTupleKey {
                user: user.clone(),
                relation: a.to_string(),
                object: project.to_openfga(),
            })
            .collect();

        let guard_tuples = if for_user.is_some() {
            // Collect unique project objects for permission checks
            let unique_projects: HashSet<_> = projects_with_actions
                .iter()
                .map(|(project, _)| project.to_openfga())
                .collect();

            unique_projects
                .into_iter()
                .map(|project_obj| CheckRequestTupleKey {
                    user: metadata.actor().to_openfga(),
                    relation: ProjectRelation::CanReadAssignments.to_string(),
                    object: project_obj,
                })
                .collect()
        } else {
            vec![]
        };

        self.check_actions_with_permission_guard(metadata.actor(), items, guard_tuples)
            .await
    }

    async fn are_allowed_warehouse_actions_impl(
        &self,
        metadata: &RequestMetadata,
        for_user: Option<&UserOrRole>,
        warehouses_with_actions: &[(&ResolvedWarehouse, Self::WarehouseAction)],
    ) -> std::result::Result<Vec<bool>, IsAllowedActionError> {
        let user =
            for_user.map_or_else(|| metadata.actor().to_openfga(), OpenFgaEntity::to_openfga);

        let items: Vec<_> = warehouses_with_actions
            .iter()
            .map(|(wh, a)| CheckRequestTupleKey {
                user: user.clone(),
                relation: a.to_string(),
                object: wh.warehouse_id.to_openfga(),
            })
            .collect();

        let guard_tuples = if for_user.is_some() {
            // Collect unique warehouse objects for permission checks
            let unique_warehouses: HashSet<_> = warehouses_with_actions
                .iter()
                .map(|(wh, _)| wh.warehouse_id.to_openfga())
                .collect();

            unique_warehouses
                .into_iter()
                .map(|warehouse_obj| CheckRequestTupleKey {
                    user: metadata.actor().to_openfga(),
                    relation: WarehouseRelation::CanReadAssignments.to_string(),
                    object: warehouse_obj,
                })
                .collect()
        } else {
            vec![]
        };

        self.check_actions_with_permission_guard(metadata.actor(), items, guard_tuples)
            .await
    }

    async fn are_allowed_namespace_actions_impl(
        &self,
        metadata: &RequestMetadata,
        for_user: Option<&UserOrRole>,
        _warehouse: &ResolvedWarehouse,
        actions: &[(&NamespaceHierarchy, Self::NamespaceAction)],
    ) -> Result<Vec<bool>, IsAllowedActionError> {
        let user =
            for_user.map_or_else(|| metadata.actor().to_openfga(), OpenFgaEntity::to_openfga);

        let items: Vec<_> = actions
            .iter()
            .map(|(namespace, a)| CheckRequestTupleKey {
                user: user.clone(),
                relation: a.to_string(),
                object: namespace.namespace_id().to_openfga(),
            })
            .collect();

        let guard_tuples = if for_user.is_some() {
            // Collect unique namespace objects for permission checks
            let unique_namespaces: HashSet<_> = actions
                .iter()
                .map(|(namespace, _)| namespace.namespace_id().to_openfga())
                .collect();

            unique_namespaces
                .into_iter()
                .map(|namespace_obj| CheckRequestTupleKey {
                    user: metadata.actor().to_openfga(),
                    relation: NamespaceRelation::CanReadAssignments.to_string(),
                    object: namespace_obj,
                })
                .collect()
        } else {
            vec![]
        };

        self.check_actions_with_permission_guard(metadata.actor(), items, guard_tuples)
            .await
    }

    async fn are_allowed_table_actions_impl(
        &self,
        metadata: &RequestMetadata,
        for_user: Option<&UserOrRole>,
        _warehouse: &ResolvedWarehouse,
        _parent_namespaces: &HashMap<NamespaceId, NamespaceWithParent>,
        tables_with_actions: &[(
            &NamespaceWithParent,
            &impl AuthZTableInfo,
            Self::TableAction,
        )],
    ) -> Result<Vec<bool>, IsAllowedActionError> {
        let user =
            for_user.map_or_else(|| metadata.actor().to_openfga(), OpenFgaEntity::to_openfga);

        let items: Vec<_> = tables_with_actions
            .iter()
            .map(|(_ns, table, a)| CheckRequestTupleKey {
                user: user.clone(),
                relation: a.to_string(),
                object: (table.warehouse_id(), table.table_id()).to_openfga(),
            })
            .collect();

        let guard_tuples = if for_user.is_some() {
            // Collect unique table objects for permission checks
            let unique_tables: HashSet<_> = tables_with_actions
                .iter()
                .map(|(_ns, table, _)| (table.warehouse_id(), table.table_id()).to_openfga())
                .collect();

            unique_tables
                .into_iter()
                .map(|table_obj| CheckRequestTupleKey {
                    user: metadata.actor().to_openfga(),
                    relation: TableRelation::CanReadAssignments.to_string(),
                    object: table_obj,
                })
                .collect()
        } else {
            vec![]
        };

        self.check_actions_with_permission_guard(metadata.actor(), items, guard_tuples)
            .await
    }

    async fn are_allowed_view_actions_impl(
        &self,
        metadata: &RequestMetadata,
        for_user: Option<&UserOrRole>,
        _warehouse: &ResolvedWarehouse,
        _parent_namespaces: &HashMap<NamespaceId, NamespaceWithParent>,
        views_with_actions: &[(&NamespaceWithParent, &impl AuthZViewInfo, Self::ViewAction)],
    ) -> Result<Vec<bool>, IsAllowedActionError> {
        let user =
            for_user.map_or_else(|| metadata.actor().to_openfga(), OpenFgaEntity::to_openfga);

        let items: Vec<_> = views_with_actions
            .iter()
            .map(|(_ns, view, a)| CheckRequestTupleKey {
                user: user.clone(),
                relation: a.to_string(),
                object: (view.warehouse_id(), view.view_id()).to_openfga(),
            })
            .collect();

        let guard_tuples = if for_user.is_some() {
            // Collect unique view objects for permission checks
            let unique_views: HashSet<_> = views_with_actions
                .iter()
                .map(|(_ns, view, _)| (view.warehouse_id(), view.view_id()).to_openfga())
                .collect();

            unique_views
                .into_iter()
                .map(|view_obj| CheckRequestTupleKey {
                    user: metadata.actor().to_openfga(),
                    relation: ViewRelation::CanReadAssignments.to_string(),
                    object: view_obj,
                })
                .collect()
        } else {
            vec![]
        };

        self.check_actions_with_permission_guard(metadata.actor(), items, guard_tuples)
            .await
    }

    async fn delete_user(
        &self,
        _metadata: &RequestMetadata,
        user_id: UserId,
    ) -> AuthorizerResult<()> {
        self.delete_all_relations(&user_id).await
    }

    async fn create_role(
        &self,
        metadata: &RequestMetadata,
        role_id: RoleId,
        parent_project_id: ProjectId,
    ) -> AuthorizerResult<()> {
        let actor = metadata.actor();

        self.require_no_relations(&role_id).await?;
        let parent_id = parent_project_id.to_openfga();
        let this_id = role_id.to_openfga();
        self.write(
            Some(vec![
                TupleKey {
                    user: actor.to_openfga(),
                    relation: RoleRelation::Ownership.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: parent_id.clone(),
                    relation: RoleRelation::Project.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
            ]),
            None,
        )
        .await
        .map_err(Into::into)
    }

    async fn delete_role(
        &self,
        _metadata: &RequestMetadata,
        role_id: RoleId,
    ) -> AuthorizerResult<()> {
        self.delete_all_relations(&role_id).await
    }

    async fn create_project(
        &self,
        metadata: &RequestMetadata,
        project_id: &ProjectId,
    ) -> AuthorizerResult<()> {
        let actor = metadata.actor();

        self.require_no_relations(project_id).await?;
        let server = self.openfga_server().clone();
        let this_id = project_id.to_openfga();
        self.write(
            Some(vec![
                TupleKey {
                    user: actor.to_openfga(),
                    relation: ProjectRelation::ProjectAdmin.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: server.clone(),
                    relation: ProjectRelation::Server.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: this_id,
                    relation: ServerRelation::Project.to_string(),
                    object: server,
                    condition: None,
                },
            ]),
            None,
        )
        .await
        .map_err(Into::into)
    }

    async fn delete_project(
        &self,
        _metadata: &RequestMetadata,
        project_id: &ProjectId,
    ) -> AuthorizerResult<()> {
        self.delete_all_relations(project_id).await
    }

    async fn create_warehouse(
        &self,
        metadata: &RequestMetadata,
        warehouse_id: WarehouseId,
        parent_project_id: &ProjectId,
    ) -> AuthorizerResult<()> {
        let actor = metadata.actor();

        self.require_no_relations(&warehouse_id).await?;
        let project_id = parent_project_id.to_openfga();
        let this_id = warehouse_id.to_openfga();
        self.write(
            Some(vec![
                TupleKey {
                    user: actor.to_openfga(),
                    relation: WarehouseRelation::Ownership.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: project_id.clone(),
                    relation: WarehouseRelation::Project.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: this_id.clone(),
                    relation: ProjectRelation::Warehouse.to_string(),
                    object: project_id.clone(),
                    condition: None,
                },
            ]),
            None,
        )
        .await
        .map_err(Into::into)
    }

    async fn delete_warehouse(
        &self,
        _metadata: &RequestMetadata,
        warehouse_id: WarehouseId,
    ) -> AuthorizerResult<()> {
        self.delete_all_relations(&warehouse_id).await
    }

    async fn create_namespace(
        &self,
        metadata: &RequestMetadata,
        namespace_id: NamespaceId,
        parent: NamespaceParent,
    ) -> AuthorizerResult<()> {
        let actor = metadata.actor();

        self.require_no_relations(&namespace_id).await?;

        let (parent_id, parent_child_relation) = match parent {
            NamespaceParent::Warehouse(warehouse_id) => (
                warehouse_id.to_openfga(),
                WarehouseRelation::Namespace.to_string(),
            ),
            NamespaceParent::Namespace(parent_namespace_id) => (
                parent_namespace_id.to_openfga(),
                NamespaceRelation::Child.to_string(),
            ),
        };
        let this_id = namespace_id.to_openfga();

        self.write(
            Some(vec![
                TupleKey {
                    user: actor.to_openfga(),
                    relation: NamespaceRelation::Ownership.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: parent_id.clone(),
                    relation: NamespaceRelation::Parent.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: this_id.clone(),
                    relation: parent_child_relation,
                    object: parent_id.clone(),
                    condition: None,
                },
            ]),
            None,
        )
        .await
        .map_err(Into::into)
    }

    async fn delete_namespace(
        &self,
        _metadata: &RequestMetadata,
        namespace_id: NamespaceId,
    ) -> AuthorizerResult<()> {
        self.delete_all_relations(&namespace_id).await
    }

    async fn create_table(
        &self,
        metadata: &RequestMetadata,
        warehouse_id: WarehouseId,
        table_id: TableId,
        parent: NamespaceId,
    ) -> AuthorizerResult<()> {
        let actor = metadata.actor();
        let parent_id = parent.to_openfga();
        let this_id = (warehouse_id, table_id).to_openfga();

        // Higher consistency as for stage create overwrites old relations are deleted
        // immediately before
        self.require_no_relations(&(warehouse_id, table_id)).await?;

        self.write_higher_consistency(
            Some(vec![
                TupleKey {
                    user: actor.to_openfga(),
                    relation: TableRelation::Ownership.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: parent_id.clone(),
                    relation: TableRelation::Parent.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: this_id.clone(),
                    relation: NamespaceRelation::Child.to_string(),
                    object: parent_id.clone(),
                    condition: None,
                },
            ]),
            None,
        )
        .await
        .map_err(Into::into)
    }

    async fn delete_table(
        &self,
        warehouse_id: WarehouseId,
        table_id: TableId,
    ) -> AuthorizerResult<()> {
        self.delete_all_relations(&(warehouse_id, table_id)).await
    }

    async fn create_view(
        &self,
        metadata: &RequestMetadata,
        warehouse_id: WarehouseId,
        view_id: ViewId,
        parent: NamespaceId,
    ) -> AuthorizerResult<()> {
        let actor = metadata.actor();
        let parent_id = parent.to_openfga();
        let this_id = (warehouse_id, view_id).to_openfga();

        self.require_no_relations(&(warehouse_id, view_id)).await?;

        self.write(
            Some(vec![
                TupleKey {
                    user: actor.to_openfga(),
                    relation: ViewRelation::Ownership.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: parent_id.clone(),
                    relation: ViewRelation::Parent.to_string(),
                    object: this_id.clone(),
                    condition: None,
                },
                TupleKey {
                    user: this_id.clone(),
                    relation: NamespaceRelation::Child.to_string(),
                    object: parent_id.clone(),
                    condition: None,
                },
            ]),
            None,
        )
        .await
        .map_err(Into::into)
    }

    async fn delete_view(
        &self,
        warehouse_id: WarehouseId,
        view_id: ViewId,
    ) -> AuthorizerResult<()> {
        self.delete_all_relations(&(warehouse_id, view_id)).await
    }
}

impl OpenFGAAuthorizer {
    #[must_use]
    /// Get the `OpenFGA` object ID for the server.
    pub(crate) fn openfga_server(&self) -> String {
        self.server_id.to_openfga()
    }

    async fn list_projects_internal(
        &self,
        actor: &Actor,
    ) -> Result<ListProjectsResponse, OpenFGABackendUnavailable> {
        let list_all = self
            .check(CheckRequestTupleKey {
                user: actor.to_openfga(),
                relation: ServerRelation::CanListAllProjects.to_string(),
                object: self.openfga_server().clone(),
            })
            .await?;

        if list_all {
            return Ok(ListProjectsResponse::All);
        }

        let projects = self
            .list_objects(
                FgaType::Project.to_string(),
                CatalogProjectAction::IncludeInList.to_openfga().to_string(),
                actor.to_openfga(),
            )
            .await?
            .into_iter()
            .filter_map(|p| {
                ProjectId::parse_from_openfga(&p)
                    .inspect_err(|e| {
                        tracing::error!("{e}. Failed to parse project id from OpenFGA.");
                    })
                    .ok()
            })
            .collect::<HashSet<ProjectId>>();

        Ok(ListProjectsResponse::Projects(projects))
    }

    /// A convenience wrapper around write.
    /// All writes happen in a single transaction.
    /// At most 100 writes can be performed in a single transaction.
    pub(crate) async fn write(
        &self,
        writes: impl Into<Option<Vec<TupleKey>>>,
        deletes: impl Into<Option<Vec<TupleKeyWithoutCondition>>>,
    ) -> OpenFGAResult<()> {
        self.client.write(writes, deletes).await.inspect_err(|e| {
            tracing::error!("Failed to write to OpenFGA: {e}");
        })?;
        Ok(())
    }

    /// A convenience wrapper around write.
    /// All writes happen in a single transaction.
    /// At most 100 writes can be performed in a single transaction.
    async fn write_higher_consistency(
        &self,
        writes: impl Into<Option<Vec<TupleKey>>>,
        deletes: impl Into<Option<Vec<TupleKeyWithoutCondition>>>,
    ) -> OpenFGAResult<()> {
        self.client_higher_consistency
            .write(writes, deletes)
            .await
            .inspect_err(|e| {
                tracing::error!("Failed to write to OpenFGA: {e}");
            })?;
        Ok(())
    }

    /// A convenience wrapper around read that handles error conversion
    pub(crate) async fn read(
        &self,
        page_size: i32,
        tuple_key: impl Into<ReadRequestTupleKey>,
        continuation_token: impl Into<Option<String>>,
    ) -> OpenFGAResult<ReadResponse> {
        self.client
            .read(page_size, tuple_key, continuation_token)
            .await
            .inspect_err(|e| {
                tracing::error!("Failed to read from OpenFGA: {e}");
            })
            .map(tonic::Response::into_inner)
            .map_err(Into::into)
    }

    /// A convenience wrapper around read that handles error conversion
    async fn read_higher_consistency(
        &self,
        page_size: i32,
        tuple_key: impl Into<ReadRequestTupleKey>,
        continuation_token: impl Into<Option<String>>,
    ) -> OpenFGAResult<ReadResponse> {
        self.client_higher_consistency
            .read(page_size, tuple_key, continuation_token)
            .await
            .inspect_err(|e| {
                tracing::error!("Failed to read from OpenFGA: {e}");
            })
            .map(tonic::Response::into_inner)
            .map_err(Into::into)
    }

    /// Read all tuples for a given request
    pub(crate) async fn read_all(
        &self,
        tuple_key: Option<impl Into<ReadRequestTupleKey>>,
    ) -> OpenFGAResult<Vec<Tuple>> {
        self.client
            .read_all_pages(tuple_key, 100, 500)
            .await
            .map_err(Into::into)
    }

    /// A convenience wrapper around check
    pub(crate) async fn check(
        &self,
        tuple_key: impl Into<CheckRequestTupleKey>,
    ) -> Result<bool, OpenFGABackendUnavailable> {
        self.client
            .check(tuple_key, None, None, false)
            .await
            .inspect_err(|e| {
                tracing::error!("Failed to check with OpenFGA: {e}");
            })
            .map_err(Into::into)
    }

    /// Helper method to check actions with permission guards when inspecting another user's permissions.
    /// This pattern is used across multiple resource types (server, project, warehouse, etc.).
    ///
    /// The `items` parameter should contain the pre-built check requests for the actions.
    /// The `guard_tuples` parameter should contain permission checks to verify the actor
    /// has the right to inspect another user's permissions. If empty, no permission checks are performed.
    async fn check_actions_with_permission_guard(
        &self,
        actor: &Actor,
        mut items: Vec<CheckRequestTupleKey>,
        guard_tuples: Vec<CheckRequestTupleKey>,
    ) -> Result<Vec<bool>, IsAllowedActionError> {
        let num_guards = guard_tuples.len();

        // Collect objects for error reporting if guards fail
        let guard_objects: Vec<_> = guard_tuples.iter().map(|t| t.object.clone()).collect();

        // Append the permission guard checks
        items.extend(guard_tuples);

        let mut results = self.batch_check(items).await?;

        // If we had guard checks, pop them and verify all passed
        if num_guards > 0 {
            let guard_results: Vec<_> = results.drain(results.len() - num_guards..).collect();
            if let Some((idx, _)) = guard_results
                .iter()
                .enumerate()
                .find(|&(_, allowed)| !allowed)
            {
                return Err(
                    CannotInspectPermissions::new(actor.clone(), &guard_objects[idx]).into(),
                );
            }
        }

        Ok(results)
    }
    /// A convenience wrapper around `batch_check`.
    async fn batch_check(
        &self,
        tuple_keys: Vec<impl Into<CheckRequestTupleKey>>,
    ) -> Result<Vec<bool>, OpenFGABackendUnavailable> {
        // Using index into tuple_keys as correlation_id.
        let num_tuples = tuple_keys.len();
        let items: Vec<BatchCheckItem> = tuple_keys
            .into_iter()
            .enumerate()
            .map(|(i, tuple_key)| BatchCheckItem {
                tuple_key: Some(tuple_key.into()),
                contextual_tuples: None,
                context: None,
                correlation_id: i.to_string(),
            })
            .collect();

        let chunks: Vec<_> = items.chunks(AUTH_CONFIG.max_batch_check_size).collect();
        let chunked_raw_results =
            try_join_all(chunks.iter().map(|&c| self.client.batch_check(c.to_vec()))).await?;

        let mut results = vec![false; num_tuples];
        let mut idxs_seen = vec![false; num_tuples];
        for raw_results_chunk in chunked_raw_results {
            for (idx, check_result) in raw_results_chunk {
                let idx: usize = idx
                    .parse()
                    .map_err(|_e| UnexpectedCorrelationId::new(idx))?;
                match check_result {
                    CheckResult::Allowed(allowed) => {
                        results[idx] = allowed;
                    }
                    CheckResult::Error(e) => {
                        return Err(BatchCheckError::from(e).into());
                    }
                }
                idxs_seen[idx] = true;
            }
        }

        if !idxs_seen.iter().all(|idx_was_seen| *idx_was_seen) {
            let missing_indexes = idxs_seen
                .into_iter()
                .enumerate()
                .filter_map(|(i, seen)| if seen { None } else { Some(i) })
                .collect::<Vec<_>>();
            let err = MissingItemInBatchCheck { missing_indexes };
            return Err(err.into());
        }
        Ok(results)
    }

    pub(crate) async fn require_action(
        &self,
        metadata: &RequestMetadata,
        action: impl OpenFgaRelation,
        object: &str,
    ) -> AuthorizerResult<()> {
        let allowed = self
            .check(CheckRequestTupleKey {
                user: metadata.actor().to_openfga(),
                relation: action.to_string(),
                object: object.to_string(),
            })
            .await?;

        if !allowed {
            return Err(ErrorModel::forbidden(
                format!("Action {action} not allowed for object {object}"),
                "ActionForbidden",
                None,
            )
            .into());
        }
        Ok(())
    }

    /// Returns Ok(()) only if not tuples are associated in any relation with the given object.
    async fn require_no_relations(&self, object: &impl OpenFgaEntity) -> AuthorizerResult<()> {
        let openfga_tpye = object.openfga_type().clone();
        let fga_object = object.to_openfga();
        let objects = openfga_tpye.user_of();
        let fga_object_str = fga_object.as_str();

        // --------------------- 1. Object as "object" for any user ---------------------
        let relations_exist = self
            .client_higher_consistency
            .exists_relation_to(&fga_object)
            .await
            .map_err(|e| {
                tracing::error!("Failed to check if relations to {fga_object} exists: {e}");
                OpenFGAError::from(e)
            })?;

        if relations_exist {
            return Err(ErrorModel::conflict(
                format!("Object to create {fga_object} already has relations"),
                "ObjectHasRelations",
                None,
            )
            .into());
        }

        // --------------------- 2. Object as "user" for related objects ---------------------
        let suffixes = suffixes_for_user(&openfga_tpye);

        let futures = objects
            .iter()
            .map(|i| (i, &suffixes))
            .map(|(o, s)| async move {
                for suffix in s {
                    let user = format!("{fga_object_str}{suffix}");
                    let tuples = self
                        .read_higher_consistency(
                            1,
                            ReadRequestTupleKey {
                                user,
                                relation: String::new(),
                                object: format!("{o}:"),
                            },
                            None,
                        )
                        .await?;

                    if !tuples.tuples.is_empty() {
                        return Err(IcebergErrorResponse::from(
                            ErrorModel::conflict(
                                format!(
                                    "Object to create {fga_object_str} is used as user for type {o}",
                                ),
                                "ObjectUsedInRelation",
                                None,
                            )
                                .append_detail(format!("Found: {tuples:?}")),
                        ));
                    }
                }

                Ok(())
            })
            .collect::<Vec<_>>();

        futures::future::try_join_all(futures).await?;

        Ok(())
    }

    async fn delete_all_relations(&self, object: &impl OpenFgaEntity) -> AuthorizerResult<()> {
        let object_openfga = object.to_openfga();
        let (own_relations, user_relations) = futures::join!(
            self.delete_own_relations(object),
            self.delete_user_relations(object)
        );
        own_relations?;
        user_relations.inspect_err(|e| {
            tracing::error!("Failed to delete user relations for {object_openfga}: {e:?}");
        })
    }

    async fn delete_user_relations(&self, user: &impl OpenFgaEntity) -> AuthorizerResult<()> {
        let user_type = user.openfga_type().clone();
        let fga_user = user.to_openfga();
        let objects = user_type.user_of();
        let fga_user_str = fga_user.as_str();

        let suffixes = suffixes_for_user(&user_type);

        let futures = objects
            .iter()
            .map(|o| (o, &suffixes))
            .map(|(o, s)| async move {
                for suffix in s {
                    let mut continuation_token = None;
                    let user = format!("{fga_user_str}{suffix}");
                    while continuation_token != Some(String::new()) {
                        let response = self
                            .read_higher_consistency(
                                MAX_TUPLES_PER_WRITE,
                                ReadRequestTupleKey {
                                    user: user.clone(),
                                    relation: String::new(),
                                    object: format!("{o}:"),
                                },
                                continuation_token.clone(),
                            )
                            .await?;
                        continuation_token = Some(response.continuation_token);
                        let keys = response
                            .tuples
                            .into_iter()
                            .filter_map(|t| t.key)
                            .collect::<Vec<_>>();
                        self.write_higher_consistency(
                            None,
                            Some(
                                keys.into_iter()
                                    .map(|t| TupleKeyWithoutCondition {
                                        user: t.user,
                                        relation: t.relation,
                                        object: t.object,
                                    })
                                    .collect(),
                            ),
                        )
                        .await?;
                    }
                }

                Result::<_, IcebergErrorResponse>::Ok(())
            })
            .collect::<Vec<_>>();

        futures::future::try_join_all(futures).await?;

        Ok(())
    }

    async fn delete_own_relations(&self, object: &impl OpenFgaEntity) -> OpenFGAResult<()> {
        let object_openfga = object.to_openfga();
        self.client_higher_consistency
            .delete_relations_to_object(&object_openfga)
            .await
            .inspect_err(|e| tracing::error!("Failed to delete relations to {object_openfga}: {e}"))
            .map_err(OpenFGAError::from)
    }

    /// A convenience wrapper around `client.list_objects`
    async fn list_objects(
        &self,
        r#type: impl Into<String>,
        relation: impl Into<String>,
        user: impl Into<String>,
    ) -> Result<Vec<String>, OpenFGABackendUnavailable> {
        let user = user.into();
        self.client
            .list_objects(r#type, relation, user, None, None)
            .await
            .map_err(Into::into)
            .map(|response| response.into_inner().objects)
    }
}

fn suffixes_for_user(user: &FgaType) -> Vec<String> {
    user.usersets()
        .iter()
        .map(|s| format!("#{s}"))
        .chain(vec![String::new()])
        .collect::<Vec<_>>()
}

#[cfg(test)]
pub(crate) mod tests {
    // Name is important for test profile
    pub(crate) mod openfga_integration_tests {
        use http::StatusCode;
        use lakekeeper::{service::authz::AuthZProjectOps, tokio};
        use openfga_client::client::ConsistencyPreference;

        use super::super::*;
        use crate::{
            client::{new_authorizer, new_client_from_default_config},
            migrate,
        };

        const TEST_CONSISTENCY: ConsistencyPreference = ConsistencyPreference::HigherConsistency;

        pub(crate) async fn new_authorizer_in_empty_store() -> OpenFGAAuthorizer {
            let client = new_client_from_default_config()
                .await
                .expect("Failed to create OpenFGA client");

            let server_id = ServerId::new_random();
            let store_name = format!("test_store_{}", uuid::Uuid::now_v7());
            migrate(&client, Some(store_name.clone()), server_id)
                .await
                .unwrap();

            new_authorizer(client, Some(store_name), TEST_CONSISTENCY, server_id)
                .await
                .unwrap()
        }

        #[tokio::test]
        async fn test_list_projects() {
            let authorizer = new_authorizer_in_empty_store().await;
            let user_id = UserId::new_unchecked("oidc", "this_user");
            let actor = Actor::Principal(user_id.clone());
            let project = ProjectId::from(uuid::Uuid::now_v7());

            let projects = authorizer
                .list_projects_internal(&actor)
                .await
                .expect("Failed to list projects");
            assert_eq!(projects, ListProjectsResponse::Projects(HashSet::new()));

            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: user_id.to_openfga(),
                        relation: ProjectRelation::ProjectAdmin.to_string(),
                        object: project.to_openfga(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            let projects = authorizer
                .list_projects_internal(&actor)
                .await
                .expect("Failed to list projects");
            assert_eq!(
                projects,
                ListProjectsResponse::Projects(HashSet::from_iter(vec![project]))
            );
        }

        #[tokio::test]
        async fn test_require_no_relations_own_relations() {
            let authorizer = new_authorizer_in_empty_store().await;

            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();

            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: "user:this_user".to_string(),
                        relation: ProjectRelation::ProjectAdmin.to_string(),
                        object: project_id.to_openfga(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            let err = authorizer
                .require_no_relations(&project_id)
                .await
                .unwrap_err();
            assert_eq!(err.error.code, StatusCode::CONFLICT.as_u16());
            assert_eq!(err.error.r#type, "ObjectHasRelations");
        }

        #[tokio::test]
        async fn test_require_no_relations_used_in_other_relations() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();

            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: project_id.to_openfga(),
                        relation: ServerRelation::Project.to_string(),
                        object: "server:this_server".to_string(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            let err = authorizer
                .require_no_relations(&project_id)
                .await
                .unwrap_err();
            assert_eq!(err.error.code, StatusCode::CONFLICT.as_u16());
            assert_eq!(err.error.r#type, "ObjectUsedInRelation");
        }

        #[tokio::test]
        async fn test_delete_own_relations_direct() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();

            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: "user:my_user".to_string(),
                        relation: ProjectRelation::ProjectAdmin.to_string(),
                        object: project_id.to_openfga(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            authorizer
                .require_no_relations(&project_id)
                .await
                .unwrap_err();
            authorizer.delete_own_relations(&project_id).await.unwrap();
            authorizer.require_no_relations(&project_id).await.unwrap();
        }

        #[tokio::test]
        async fn test_delete_own_relations_usersets() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();

            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: "role:my_role#assignee".to_string(),
                        relation: ProjectRelation::ProjectAdmin.to_string(),
                        object: project_id.to_openfga(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            authorizer
                .require_no_relations(&project_id)
                .await
                .unwrap_err();
            authorizer.delete_own_relations(&project_id).await.unwrap();
            authorizer.require_no_relations(&project_id).await.unwrap();
        }

        #[tokio::test]
        async fn test_delete_own_relations_many() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();

            for i in 0..502 {
                authorizer
                    .write(
                        Some(vec![
                            TupleKey {
                                user: format!("user:user{i}"),
                                relation: ProjectRelation::ProjectAdmin.to_string(),
                                object: project_id.to_openfga(),
                                condition: None,
                            },
                            TupleKey {
                                user: format!("warehouse:warehouse_{i}"),
                                relation: ProjectRelation::Warehouse.to_string(),
                                object: project_id.to_openfga(),
                                condition: None,
                            },
                        ]),
                        None,
                    )
                    .await
                    .unwrap();
            }

            authorizer
                .require_no_relations(&project_id)
                .await
                .unwrap_err();
            authorizer.delete_own_relations(&project_id).await.unwrap();
            // openfga is eventually consistent, this should make tests less flaky
            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
            authorizer.require_no_relations(&project_id).await.unwrap();
        }

        #[tokio::test]
        async fn test_delete_own_relations_empty() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();

            authorizer.delete_own_relations(&project_id).await.unwrap();
            authorizer.require_no_relations(&project_id).await.unwrap();
        }

        #[tokio::test]
        async fn test_delete_user_relations() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();

            let project_id = ProjectId::from(uuid::Uuid::now_v7());

            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: project_id.to_openfga(),
                        relation: WarehouseRelation::Project.to_string(),
                        object: "warehouse:my_warehouse".to_string(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            authorizer
                .require_no_relations(&project_id)
                .await
                .unwrap_err();
            authorizer.delete_user_relations(&project_id).await.unwrap();
            authorizer.require_no_relations(&project_id).await.unwrap();
        }

        #[tokio::test]
        async fn test_delete_non_existing_relation_gives_404() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            let result = authorizer
                .write(
                    None,
                    Some(vec![TupleKeyWithoutCondition {
                        user: project_id.to_openfga(),
                        relation: WarehouseRelation::Project.to_string(),
                        object: "warehouse:my_warehouse".to_string(),
                    }]),
                )
                .await
                .unwrap_err();

            assert_eq!(
                ErrorModel::from(result).code,
                StatusCode::NOT_FOUND.as_u16()
            );
        }

        #[tokio::test]
        async fn test_duplicate_writes_give_409() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: project_id.to_openfga(),
                        relation: WarehouseRelation::Project.to_string(),
                        object: "warehouse:my_warehouse".to_string(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            let result = authorizer
                .write(
                    Some(vec![TupleKey {
                        user: project_id.to_openfga(),
                        relation: WarehouseRelation::Project.to_string(),
                        object: "warehouse:my_warehouse".to_string(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap_err();
            assert_eq!(ErrorModel::from(result).code, StatusCode::CONFLICT.as_u16());
        }

        #[tokio::test]
        async fn test_delete_user_relations_empty() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();
            authorizer.delete_user_relations(&project_id).await.unwrap();
            authorizer.require_no_relations(&project_id).await.unwrap();
        }

        #[tokio::test]
        async fn test_delete_user_relations_many() {
            let authorizer = new_authorizer_in_empty_store().await;
            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            authorizer.require_no_relations(&project_id).await.unwrap();

            for i in 0..502 {
                authorizer
                    .write(
                        Some(vec![
                            TupleKey {
                                user: project_id.to_openfga(),
                                relation: WarehouseRelation::Project.to_string(),
                                object: format!("warehouse:warehouse_{i}"),
                                condition: None,
                            },
                            TupleKey {
                                user: project_id.to_openfga(),
                                relation: ServerRelation::Project.to_string(),
                                object: format!("server:server_{i}"),
                                condition: None,
                            },
                        ]),
                        None,
                    )
                    .await
                    .unwrap();
            }

            authorizer
                .require_no_relations(&project_id)
                .await
                .unwrap_err();
            authorizer.delete_user_relations(&project_id).await.unwrap();
            authorizer.require_no_relations(&project_id).await.unwrap();
        }

        #[tokio::test]
        async fn test_delete_user_relations_userset() {
            let authorizer = new_authorizer_in_empty_store().await;
            let user = RoleId::new(uuid::Uuid::nil());
            authorizer.require_no_relations(&user).await.unwrap();

            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: format!("{}#assignee", user.to_openfga()),
                        relation: ProjectRelation::ProjectAdmin.to_string(),
                        object: "project:my_project".to_string(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            authorizer.require_no_relations(&user).await.unwrap_err();
            authorizer.delete_user_relations(&user).await.unwrap();
            authorizer.require_no_relations(&user).await.unwrap();
        }

        #[tokio::test]
        async fn test_are_allowed_project_actions_without_for_user() {
            let authorizer = new_authorizer_in_empty_store().await;
            let user_id: UserId = UserId::new_unchecked("oidc", "test_user");
            let project_id = ProjectId::from(uuid::Uuid::now_v7());

            let metadata = RequestMetadata::test_user(user_id.clone());

            // Before granting any permissions, user should not have access
            let results = authorizer
                .are_allowed_project_actions_impl(
                    &metadata,
                    None,
                    &[
                        (&project_id, ProjectRelation::CanCreateWarehouse),
                        (&project_id, ProjectRelation::CanListWarehouses),
                    ],
                )
                .await
                .unwrap();
            assert_eq!(results, vec![false, false]);

            // Grant the user ProjectAdmin permission
            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: user_id.to_openfga(),
                        relation: ProjectRelation::ProjectAdmin.to_string(),
                        object: project_id.to_openfga(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            // Now user should have access to both actions
            let results = authorizer
                .are_allowed_project_actions_impl(
                    &metadata,
                    None,
                    &[
                        (&project_id, ProjectRelation::CanCreateWarehouse),
                        (&project_id, ProjectRelation::CanListWarehouses),
                    ],
                )
                .await
                .unwrap();
            assert_eq!(results, vec![true, true]);
        }

        #[tokio::test]
        async fn test_are_allowed_project_actions_with_for_user() {
            let authorizer = new_authorizer_in_empty_store().await;

            // Admin user who can check permissions
            let admin_user_id = UserId::new_unchecked("oidc", "admin_user");

            // Target user whose permissions we're checking
            let target_user_id = UserId::new_unchecked("oidc", "target_user");
            let target_user = UserOrRole::User(target_user_id.clone());

            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            let metadata = RequestMetadata::test_user(admin_user_id.clone());

            // Grant target user some permissions on the project
            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: target_user_id.to_openfga(),
                        relation: ProjectRelation::DataAdmin.to_string(),
                        object: project_id.to_openfga(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            // Admin tries to check target user's permissions without having CanReadAssignments
            // Should fail with CannotInspectPermissions
            let result = authorizer
                .are_allowed_project_actions_impl(
                    &metadata,
                    Some(&target_user),
                    &[
                        (&project_id, ProjectRelation::CanCreateWarehouse),
                        (&project_id, ProjectRelation::CanListWarehouses),
                    ],
                )
                .await;

            assert!(result.is_err());
            let err = result.unwrap_err();
            match err {
                IsAllowedActionError::CannotInspectPermissions(_) => {
                    // Expected error
                }
                IsAllowedActionError::AuthorizationBackendUnavailable(_) => {
                    panic!("Expected CannotInspectPermissions error, got: {err:?}")
                }
            }

            // Grant admin user CanReadAssignments permission on the project
            authorizer
                .write(
                    Some(vec![TupleKey {
                        user: admin_user_id.to_openfga(),
                        relation: ProjectRelation::ProjectAdmin.to_string(),
                        object: project_id.to_openfga(),
                        condition: None,
                    }]),
                    None,
                )
                .await
                .unwrap();

            // Now admin should be able to check target user's permissions
            let results = authorizer
                .are_allowed_project_actions_vec(
                    &metadata,
                    Some(&target_user),
                    &[
                        (&project_id, ProjectRelation::CanGetMetadata),
                        (&project_id, ProjectRelation::CanGrantProjectAdmin),
                    ],
                )
                .await
                .unwrap()
                .into_inner();

            assert_eq!(results, vec![true, false]);
        }

        #[tokio::test]
        async fn test_are_allowed_project_actions_for_user_checks_correct_user() {
            let authorizer = new_authorizer_in_empty_store().await;

            // Admin user who can check permissions
            let admin_user_id = UserId::new_unchecked("oidc", "admin_user");

            // Target user whose permissions we're checking
            let target_user_id = UserId::new_unchecked("oidc", "target_user");
            let target_user = UserOrRole::User(target_user_id.clone());

            let project_id = ProjectId::from(uuid::Uuid::now_v7());
            let metadata = RequestMetadata::test_user(admin_user_id.clone());

            // Grant admin user permissions on the project
            authorizer
                .write(
                    Some(vec![
                        TupleKey {
                            user: admin_user_id.to_openfga(),
                            relation: ProjectRelation::ProjectAdmin.to_string(),
                            object: project_id.to_openfga(),
                            condition: None,
                        },
                        TupleKey {
                            user: target_user.to_openfga(),
                            relation: ProjectRelation::DataAdmin.to_string(),
                            object: project_id.to_openfga(),
                            condition: None,
                        },
                    ]),
                    None,
                )
                .await
                .unwrap();

            // Check target user's permissions (not the admin's)
            // Target user has no permissions
            let results = authorizer
                .are_allowed_project_actions_vec(
                    &metadata,
                    Some(&target_user),
                    &[
                        (&project_id, ProjectRelation::CanGetMetadata),
                        (&project_id, ProjectRelation::CanGrantProjectAdmin),
                    ],
                )
                .await
                .unwrap()
                .into_inner();

            assert_eq!(results, vec![true, false]);
        }
    }
}
